{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Kubeflow pipelines\n",
    "\n",
    "**Learning Objectives:**\n",
    "  1. Learn how to deploy a Kubeflow cluster on GCP\n",
    "  1. Learn how to create a experiment in Kubeflow\n",
    "  1. Learn how to package you code into a Kubeflow pipeline\n",
    "  1. Learn how to run a Kubeflow pipeline in a repeatable and traceable way\n",
    "\n",
    "\n",
    "## Introduction\n",
    "\n",
    "In this notebook, we will first setup a Kubeflow cluster on GCP.\n",
    "Then, we will create a Kubeflow experiment and a Kubflow pipeline from our taxifare machine learning code. At last, we will run the pipeline on the Kubeflow cluster, providing us with a reproducible and traceable way to execute machine learning code."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pip freeze | grep kfp || pip install kfp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from os import path\n",
    "\n",
    "import kfp\n",
    "import kfp.compiler as compiler\n",
    "import kfp.components as comp\n",
    "import kfp.dsl as dsl\n",
    "import kfp.gcp as gcp\n",
    "import kfp.notebook"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup a Kubeflow cluster on GCP"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**TODO 1**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To deploy a [Kubeflow](https://www.kubeflow.org/) cluster\n",
    "in your GCP project, use the [AI Platform pipelines](https://console.cloud.google.com/ai-platform/pipelines):\n",
    "\n",
    "1. Go to [AI Platform Pipelines](https://console.cloud.google.com/ai-platform/pipelines) in the GCP Console.\n",
    "1. Create a new instance\n",
    "2. Hit \"Configure\"\n",
    "3. Check the box \"Allow access to the following Cloud APIs\"\n",
    "1. Hit \"Create New Cluster\"\n",
    "4. Hit \"Deploy\"\n",
    "\n",
    "When the cluster is ready, go back to the AI Platform pipelines page and click on \"SETTINGS\" entry for your cluster.\n",
    "This will bring up a pop up with code snippets on how to access the cluster \n",
    "programmatically. \n",
    "\n",
    "Copy the \"host\" entry and set the \"HOST\" variable below with that.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "HOST = # TODO: fill in the HOST information for the cluster\n",
    "BUCKET = # TODO: fill in the GCS bucket"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create an experiment"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**TODO 2**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We will start by creating a Kubeflow client to pilot the Kubeflow cluster:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = # TODO: create a Kubeflow client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's look at the experiments that are running on this cluster. Since you just launched it, you should see only a single \"Default\" experiment:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client.list_experiments()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's create a 'taxifare' experiment where we could look at all the various runs of our taxifare pipeline:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "exp = # TODO: create an experiment called 'taxifare'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's make sure the experiment has been created correctly:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client.list_experiments()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Packaging your code into Kubeflow components"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We have packaged our taxifare ml pipeline into three components:\n",
    "* `./components/bq2gcs` that creates the training and evaluation data from BigQuery and exports it to GCS\n",
    "* `./components/trainjob` that launches the training container on AI-platform and exports the model\n",
    "* `./components/deploymodel` that deploys the trained model to AI-platform as a REST API\n",
    "\n",
    "Each of these components has been wrapped into a Docker container, in the same way we did with the taxifare training code in the previous lab.\n",
    "\n",
    "If you inspect the code in these folders, you'll notice that the `main.py` or `main.sh` files contain the code we previously executed in the notebooks (loading the data to GCS from BQ, or launching a training job to AI-platform, etc.). The last line in the `Dockerfile` tells you that these files are executed when the container is run. \n",
    "So we just packaged our ml code into light container images for reproducibility. \n",
    "\n",
    "We have made it simple for you to build the container images and push them to the Google Cloud image registry gcr.io in your project:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Builds the taxifare trainer container in case you skipped the optional part of lab 1\n",
    "!taxifare/scripts/build.sh"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Pushes the taxifare trainer container to gcr/io\n",
    "!taxifare/scripts/push.sh"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Builds the KF component containers and push them to gcr/io\n",
    "!cd pipelines && make components"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that the container images are pushed to the [registry in your project](https://console.cloud.google.com/gcr), we need to create yaml files describing to Kubeflow how to use these containers. It boils down essentially to\n",
    "* describing what arguments Kubeflow needs to pass to the containers when it runs them\n",
    "* telling Kubeflow where to fetch the corresponding Docker images\n",
    "\n",
    "In the cells below, we have three of these \"Kubeflow component description files\", one for each of our components."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**TODO 3**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**IMPORTANT: Modify the image URI in the cell \n",
    "below to reflect that you pushed the images into the gcr.io associated with your project.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile bq2gcs.yaml\n",
    "\n",
    "name: bq2gcs\n",
    "    \n",
    "description: |\n",
    "    This component creates the training and\n",
    "    validation datasets as BiqQuery tables and export\n",
    "    them into a Google Cloud Storage bucket at\n",
    "    gs://<BUCKET>/taxifare/data.\n",
    "        \n",
    "inputs:\n",
    "    - {name: Input Bucket , type: String, description: 'GCS directory path.'}\n",
    "\n",
    "implementation:\n",
    "    container:\n",
    "        image: # TODO: Reference the image URI for taxifare-bq2gcs you just created\n",
    "        args: [\"--bucket\", {inputValue: Input Bucket}]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile trainjob.yaml\n",
    "\n",
    "name: trainjob\n",
    "    \n",
    "description: |\n",
    "    This component trains a model to predict that taxi fare in NY.\n",
    "    It takes as argument a GCS bucket and expects its training and\n",
    "    eval data to be at gs://<BUCKET>/taxifare/data/ and will export\n",
    "    the trained model at  gs://<BUCKET>/taxifare/model/.\n",
    "        \n",
    "inputs:\n",
    "    - {name: Input Bucket , type: String, description: 'GCS directory path.'}\n",
    "\n",
    "implementation:\n",
    "    container:\n",
    "        image: # TODO: Reference the image URI for taxifare-trainjob you just created\n",
    "        args: [{inputValue: Input Bucket}]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile deploymodel.yaml\n",
    "\n",
    "name: deploymodel\n",
    "    \n",
    "description: |\n",
    "    This component deploys a trained taxifare model on GCP as taxifare:dnn.\n",
    "    It takes as argument a GCS bucket and expects the model to deploy \n",
    "    to be found at gs://<BUCKET>/taxifare/model/export/savedmodel/\n",
    "        \n",
    "inputs:\n",
    "    - {name: Input Bucket , type: String, description: 'GCS directory path.'}\n",
    "\n",
    "implementation:\n",
    "    container:\n",
    "        image: # TODO: Reference the image URI for taxifare-deployment you just created\n",
    "        args: [{inputValue: Input Bucket}]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a Kubeflow pipeline"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The code below creates a kubeflow pipeline by decorating a regular function with the\n",
    "`@dsl.pipeline` decorator. Now the arguments of this decorated function will be\n",
    "the input parameters of the Kubeflow pipeline.\n",
    "\n",
    "Inside the function, we describe the pipeline by\n",
    "* loading the yaml component files we created above into a Kubeflow `op`\n",
    "* specifying the order into which the Kubeflow ops should be run"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# TODO 3\n",
    "PIPELINE_TAR = 'taxifare.tar.gz'\n",
    "BQ2GCS_YAML = './bq2gcs.yaml'\n",
    "TRAINJOB_YAML = './trainjob.yaml'\n",
    "DEPLOYMODEL_YAML = './deploymodel.yaml'\n",
    "\n",
    "\n",
    "@dsl.pipeline(\n",
    "    name='Taxifare',\n",
    "    description='Train a ml model to predict the taxi fare in NY')\n",
    "def pipeline(gcs_bucket_name='<bucket where data and model will be exported>'):\n",
    "\n",
    "    bq2gcs_op = comp.load_component_from_file(BQ2GCS_YAML)\n",
    "    bq2gcs = bq2gcs_op(\n",
    "        input_bucket=gcs_bucket_name,\n",
    "    )\n",
    "\n",
    "    trainjob_op = # TODO: Load the yaml file for training\n",
    "    trainjob = # TODO: Add your code to run the training job\n",
    "    )\n",
    "\n",
    "    deploymodel_op = # TODO: Load the yaml file for deployment\n",
    "    deploymodel = # TODO: Addd your code to run model deployment\n",
    "    )\n",
    "\n",
    "    # TODO: Add the code to run 'trainjob' after 'bq2gcs' in the pipeline\n",
    "    # TODO: Add the code to run 'deploymodel' after 'trainjob' in the pipeline"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The pipeline function above is then used by the Kubeflow compiler to create a Kubeflow pipeline artifact that can be either uploaded to the Kubeflow cluster from the UI, or programatically, as we will do below:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# TODO: Compile the pipeline functon above"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ls $PIPELINE_TAR"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you untar and uzip this pipeline artifact, you'll see that the compiler has transformed the\n",
    "Python description of the pipeline into yaml description!\n",
    "\n",
    "Now let's feed Kubeflow with our pipeline and run it using our client:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# TODO 4\n",
    "run = client.run_pipeline(\n",
    "    experiment_id= # TODO: Add code for experiment id\n",
    "    job_name= # TODO: Provide a jobname\n",
    "    pipeline_package_path= # TODO: Add code for pipeline zip file\n",
    "    params={\n",
    "        'gcs_bucket_name': BUCKET,\n",
    "    },\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Have a look at the link to monitor the run. If the pipeline run fails, then create the model resource first by using `gcloud ai-platform models create taxifare`. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now all the runs are nicely organized under the experiment in the UI, and new runs can be either manually launched or scheduled through the UI in a completely repeatable and traceable way!"
   ]
  }
 ],
 "metadata": {
  "environment": {
   "name": "tf2-gpu.2-1.m49",
   "type": "gcloud",
   "uri": "gcr.io/deeplearning-platform-release/tf2-gpu.2-1:m49"
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
